package com.lefu.risk.storm.generdata;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class DataGenerateTopology {
	
	public static void main(String[] args) {
		
		TopologyBuilder builder = new TopologyBuilder();
		
		ShuffGenerDataSpout spout = new ShuffGenerDataSpout();
		DateGenerateBolt dateBolt = new DateGenerateBolt();
		ReducerBolt reduceBolt = new ReducerBolt();
		MyAssignGrouping myGrouping = new MyAssignGrouping();
		
		builder.setSpout("DGSPT",spout);
		builder.setBolt("DATEBLT", dateBolt,15).shuffleGrouping("DGSPT");
		builder.setBolt("RDCBLT", reduceBolt,30).customGrouping("DATEBLT", myGrouping);
		
		
		
		Config conf = new Config();
		LocalCluster localCluster = new LocalCluster();
		
		try {
			localCluster.submitTopology("dataGenerate", conf, builder.createTopology());
		} catch (RuntimeException e) {
			try {
				Thread.sleep(1000*60);
				localCluster.killTopology("dataGenerate");
				localCluster.shutdown();
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}
		}
		
	}
	
}
